package com.flink.examples.functions;

import com.flink.examples.DataSource;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Arrays;
import java.util.List;

/**
 * @Description Connect算子：功能与union类似，将两个流（union支持两个或以上）合并为一个流，但区别在于connect不要求数据类型一致
 * @Author JL
 * @Date 2020/09/17
 * @Version V1.0
 */
public class Connect {

    /**
     * 将两个不区分数据类型的数据流合并成一个数据流，并打印
     *
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
        //dataStream 1
        DataStream<Tuple3<String, String, Integer>> dataStream1 = env.fromCollection(tuple3List);
        //dataStream 2
        DataStream<Tuple3<String, String, Integer>> dataStream2 = env.fromCollection(Arrays.asList(
                new Tuple3<>("医生", "上海", 2),
                new Tuple3<>("老师", "北京", 4),
                new Tuple3<>("工人", "广州", 9)
        ));
        //合关两个数据流
        DataStream<Tuple4<String, String, Integer, String>> dataStream = dataStream1.connect(dataStream2)
                .map(new CoMapFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, Tuple4<String, String, Integer, String>>() {
                    //表示dataStream1的流输入
                    @Override
                    public Tuple4<String, String, Integer, String> map1(Tuple3<String, String, Integer> value) throws Exception {
                        return Tuple4.of(value.f0, value.f1, value.f2, "用户");
                    }
                    //表示dataStream2的流输入
                    @Override
                    public Tuple4<String, String, Integer, String> map2(Tuple3<String, String, Integer> value) throws Exception {
                        return Tuple4.of(value.f0, value.f1, value.f2, "职业");
                    }
                });

        //打印
        dataStream.print();
        env.execute("flink Split job");
    }

}
/*
(张三,man,20,用户)
(李四,girl,24,用户)
(王五,man,29,用户)
(刘六,girl,32,用户)
(伍七,girl,18,用户)
(吴八,man,30,用户)
(医生,上海,2,职业)
(老师,北京,4,职业)
(工人,广州,9,职业)
 */